#include <pthread.h>

#include <glog/logging.h>
#include <glog/log_severity.h>

#include "dispatch.h"


MessageDispatcher* MessageDispatcher::m_pInstance = NULL;

MessageRequest::MessageRequest(int type)  : msg_type_(type), ext_type_(0)
{

}

MessageRequest::~MessageRequest()
{
	LOG(INFO)<<"MessageRequest::~MessageRequest";
}

int MessageRequest::GetType(void) const
{
	return this->msg_type_;
}

int MessageRequest::ExtType(void) const
{
	//LOG(INFO)<<"Get MessageRequest::ExtType: "<<ext_type_;

	return this->ext_type_;
}

void MessageRequest::ExtType(int type)
{
	//LOG(INFO)<<"Set MessageRequest::ExtType: "<<type;
	
	this->ext_type_ = type;

	//LOG(INFO)<<"After Set MessageRequest::ExtType: "<<ext_type_;
}

int MessageRequest::Call(void *args)
{
	MessageDeliver *deliver = reinterpret_cast<MessageDeliver *>(args);

	if(deliver)
	{
		//LOG(INFO)<<"MessageRequest::Call, ext_type: "<<this->ExtType();
		
		deliver->Execute(this);
	}
	
	return 0;
}

/////////////////////////////////////////////////////////////////////////////////
MessageDeliver::MessageDeliver(int type)
{ 
	this->SetType(type);

	this->queue_ = new MessageQueue;
	//IF_NEW_FAILED(this->queue_)

	pthread_t pid;
	
	if(pthread_create(&pid, NULL, GetRequest, (void *)this) != 0)
	{
		//LOG(ERROR)<<"pthread create Work thread failed\n";
	}
	else 
	{
		pthread_detach(pid);
	}
} 

MessageDeliver::~MessageDeliver(void) 
{ 
	if(this->queue_)
		delete this->queue_;
} 


int MessageDeliver::GetType(void) const 
{ 
  return this->type_; 
} 

void MessageDeliver::SetType(int type)
{
  this->type_ = type;
}

int MessageDeliver::Work(MessageRequest *mr) 
{
	//LOG(INFO)<<"MessageDeliver::Work, mr: "<<mr;

	return queue_->Put(mr);
}

void *MessageDeliver::GetRequest(void* arg)
{
	MessageDeliver *md = reinterpret_cast<MessageDeliver *>(arg);

	while(1)
	{
		MessageRequest *mr = md->queue_->Get();
		if (mr)
		{
			CallRequest *req = reinterpret_cast<CallRequest *> (mr);
			
			//LOG(INFO)<<"MessageDeliver::GetRequest, mr: "<<mr;
			
			req->Call(md);

			if (false == mr->DelayRelease())
			{
				delete mr;
			}
		}
	}
}
/////////////////////////////////////////////////////////////////////////////////////////////
MessageQueue::MessageQueue()
{
}

MessageQueue::~MessageQueue()
{
}

int MessageQueue::Put(MessageRequest *req)
{
	//LOG(INFO)<<"MessageQueue::Put, type: "<<req->GetType()<<" ext_type: "<<req->ExtType()<<" req: "<<req;
	
	std::unique_lock<std::mutex> lck(mutex_);
	
	msg_queue_.push(req);
		
	lck.unlock();
	
	cv_.notify_one();

	//LOG(INFO)<<"End MessageQueue::Put, type: "<<req->GetType()<<" ext_type: "<<req->ExtType();

	return 0;
}

MessageRequest * MessageQueue::Get()
{
	//LOG(INFO)<<"MessageQueue::Get------------------- ";

	//pthread_mutex_lock(&mutex_);

	std::unique_lock<std::mutex> lck(mutex_);
	
	while (msg_queue_.empty()) 
	{
		//LOG(INFO)<<"queue is empty, wait";
		
		cv_.wait(lck);
	}

	MessageRequest *req = msg_queue_.front();
	msg_queue_.pop();

	//LOG(INFO)<<"Doing MessageQueue::Get "<<req;

	//pthread_mutex_unlock(&mutex_);
	lck.unlock();

	//LOG(INFO)<<"End MessageQueue::Get, type: "<<req->GetType()<<" ext_type: "<<req->ExtType();

	return req;
}

////////////////////////////////////////////////////////////////////////////
MessageDispatcher::MessageDispatcher(void) 
{ 

} 

MessageDispatcher::~MessageDispatcher(void) 
{ 

} 

int MessageDispatcher::RegisterCommDeliver(MessageDeliver *deliver_ptr) 
{ 
	std::unique_lock<std::mutex> lck(lock_);

	int result = 0; 

	msg_deliver_list_t &list = this->comm_msg_deliver_list_; 
	msg_deliver_list_iter_t iter; 

	for (iter = list.begin(); iter != list.end(); iter++) 
	{
		if((*iter)->GetType() == deliver_ptr->GetType()) 
		{
			//lck.unlock();
			return -1;
		}
	}

	list.push_back(deliver_ptr); 

	//lck.unlock();

	return 0; 
} 
 
int MessageDispatcher::UnRegisterCommDeliver(MessageDeliver *deliver_ptr) 
{ 
	std::unique_lock<std::mutex> lck(lock_);

	msg_deliver_list_t &list = this->comm_msg_deliver_list_; 
	msg_deliver_list_iter_t iter; 

	for(iter = list.begin(); iter != list.end(); iter++) 
	{
		if ((*iter)->GetType() == deliver_ptr->GetType()) 
		{ 
			list.erase(iter); 

			//lck.unlock();
			
			return 0; 
		} 

	}

	//lck.unlock();
	
	return -1; 
} 
 
int MessageDispatcher::DispatchMsg(MessageRequest *msg_req)
{ 	
	int result = this->DispatchCommMsg(msg_req);
	
	return result; 
} 

int MessageDispatcher::DispatchCommMsg(MessageRequest *msg_req) 
{ 
	std::unique_lock<std::mutex> lck(lock_);

	int count = 0; 

	// Disaptch common message now. 
	msg_deliver_list_t &list = this->comm_msg_deliver_list_; 
	msg_deliver_list_iter_t iter; 

	for(iter = list.begin(); iter != list.end(); iter++) 
	{
		if((*iter)->GetType() == msg_req->GetType()) 
		{ 
			(*iter)->Work(msg_req); 
			count ++; 
		}
	}

	//lck.unlock();

	return count; 
} 

MessageDispatcher* MessageDispatcher::Instance()  
{  
	if (m_pInstance == NULL )
	{
		m_pInstance = new MessageDispatcher();	
		
		//IF_NEW_FAILED(m_pInstance);
	}

	return m_pInstance;  
}


